/* Copyright (C) 2014 InfiniDB, Inc.
 * Copyright (C) 2016 MariaDB Corporation.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/***********************************************************************
 *   $Id: commandpackageprocessor.cpp 9613 2013-06-12 15:44:24Z dhall $
 *
 *
 ***********************************************************************/
#include <ctime>
#include <iostream>
#include <sstream>
#include <set>
#include <vector>
#include <boost/scoped_ptr.hpp>

#include "commandpackageprocessor.h"
#include "messagelog.h"
#include "dbrm.h"
#include "sqllogger.h"
#include "tablelockdata.h"
#include "we_messages.h"
#include "we_ddlcommandclient.h"
#include "oamcache.h"
#include "liboamcpp.h"
#include "resourcemanager.h"

using namespace std;
using namespace WriteEngine;
using namespace dmlpackage;
using namespace execplan;
using namespace logging;
using namespace boost;
using namespace BRM;

namespace dmlpackageprocessor
{
// Tracks active cleartablelock commands by storing set of table lock IDs
/*static*/ std::set<uint64_t> CommandPackageProcessor::fActiveClearTableLockCmds;
/*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex;

DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal(
    dmlpackage::CalpontDMLPackage& cpackage)
{
  SUMMARY_INFO("CommandPackageProcessor::processPackage");

  DMLResult result;
  result.result = NO_ERROR;

  VERBOSE_INFO("Processing Command DML Package...");
  std::string stmt = cpackage.get_DMLStatement();
  boost::algorithm::to_upper(stmt);
  trim(stmt);
  fSessionID = cpackage.get_SessionID();
  BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
  uint64_t uniqueId = 0;

  // Bug 5070. Added exception handling
  try
  {
    uniqueId = fDbrm->getUnique64();
  }
  catch (std::exception& ex)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add(ex.what());
    message.format(args);
    result.result = COMMAND_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnid);
    return result;
  }
  catch (...)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("Unknown error occurred while getting unique number.");
    message.format(args);
    result.result = COMMAND_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnid);
    return result;
  }

  string errorMsg;
  bool queRemoved = false;
  logging::LoggingID lid(20);
  logging::MessageLog ml(lid);
  LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
  logging::Message::Args args1;
  logging::Message msg(1);
  Logger logger(logid.fSubsysID);

  if (stmt != "CLEANUP")
  {
    args1.add("Start SQL statement: ");
    args1.add(stmt);
    msg.format(args1);
    logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
  }

  // fWEClient->addQueue(uniqueId);
  try
  {
    // set-up the transaction
    if ((stmt == "COMMIT") || (stmt == "ROLLBACK"))
    {
      // SQLLogger sqlLogger(stmt, DMLLoggingId, cpackage.get_SessionID(), txnid.id);

      if ((txnid.valid))
      {
        vector<LBID_t> lbidList;
        fDbrm->getUncommittedExtentLBIDs(static_cast<VER_t>(txnid.id), lbidList);
        bool cpInvalidated = false;

        // cout << "get a valid txnid " << txnid.id << " and stmt is " << stmt << " and isBachinsert is " <<
        // cpackage.get_isBatchInsert() << endl;
        if ((stmt == "COMMIT") && (cpackage.get_isBatchInsert()))
        {
          // update syscolumn for the next value
          boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
              CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
          CalpontSystemCatalog::TableName tableName;
          tableName = systemCatalogPtr->tableName(cpackage.getTableOid());

          try
          {
            uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);

            if (nextVal != AUTOINCR_SATURATED)  // need to update syscolumn
            {
              // get autoincrement column oid
              int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
              // get the current nextVal from controller
              scoped_ptr<DBRM> aDbrm(new DBRM());
              uint64_t nextValInController = 0;
              bool validNextVal = false;
              aDbrm->getAILock(columnOid);
              nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);  // in case it has changed
              validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);

              if ((validNextVal) && (nextValInController > nextVal))
              {
                fWEClient->removeQueue(uniqueId);
                queRemoved = true;
                WE_DDLCommandClient ddlClient;
                uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController);
                //@bug 5894. Need release lock.
                aDbrm->releaseAILock(columnOid);

                if (rc != 0)
                  throw std::runtime_error("Error in UpdateSyscolumnNextval");
              }
              else
                aDbrm->releaseAILock(columnOid);
            }
          }
          catch (std::exception& ex)
          {
            // Rollback transaction
            rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
            fSessionManager.rolledback(txnid);
            throw std::runtime_error(ex.what());
          }

          // systemCatalogPtr->updateColinfoCache(nextValMap);
          int weRc = 0;

          if (cpackage.get_isAutocommitOn())
          {
            weRc = commitBatchAutoOnTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);

            if (weRc != 0)
              BRM::errString(weRc, errorMsg);

            cpInvalidated = true;
          }
          else
          {
            weRc = fDbrm->vbCommit(txnid.id);

            if (weRc != 0)
              BRM::errString(weRc, errorMsg);

            // weRc = commitBatchAutoOffTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
          }

          if (weRc != 0)
          {
            throw std::runtime_error(errorMsg);
          }

          logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
          fSessionManager.committed(txnid);
          // cout << "releasing  transaction id for batchinsert" <<  txnid.id << endl;
        }
        else if (stmt == "COMMIT")
        {
          // cout << "success in commitTransaction" << endl;
          // update syscolumn for the next value
          boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
              CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
          CalpontSystemCatalog::TableName tableName;
          uint32_t tableOid = cpackage.getTableOid();
          std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();

          if (tableOid == 0)  // special case: transaction commit for autocommit off and not following a dml
                              // statement immediately
          {
            TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
            TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
            TablelockData::OIDTablelock::iterator iter;

            if (!tablelockMap.empty())
            {
              for (unsigned k = 0; k < tableLocks.size(); k++)
              {
                iter = tablelockMap.find(tableLocks[k].tableOID);

                if (iter != tablelockMap.end())
                {
                  tableName = systemCatalogPtr->tableName(tableLocks[k].tableOID);

                  try
                  {
                    uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);

                    if (nextVal != AUTOINCR_SATURATED)  // neet to update syscolumn
                    {
                      // get autoincrement column oid
                      int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
                      // get the current nextVal from controller
                      scoped_ptr<DBRM> aDbrm(new DBRM());
                      uint64_t nextValInController = 0;
                      bool validNextVal = false;
                      aDbrm->getAILock(columnOid);
                      nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);  // in case it has changed
                      validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);

                      if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
                      {
                        fWEClient->removeQueue(uniqueId);

                        queRemoved = true;
                        WE_DDLCommandClient ddlClient;
                        uint8_t rc =
                            ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
                        aDbrm->releaseAILock(columnOid);

                        if (rc != 0)
                        {
                          // for now
                          fSessionManager.committed(txnid);
                          throw std::runtime_error("Error in UpdateSyscolumnNextval");
                        }
                      }
                      else
                        aDbrm->releaseAILock(columnOid);
                    }
                  }
                  catch (std::exception& ex)
                  {
                    // Rollback transaction, release tablelock
                    rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
                    fDbrm->releaseTableLock(iter->second);
                    fSessionManager.rolledback(txnid);
                    throw std::runtime_error(ex.what());
                  }
                }
              }
            }
          }
          else
          {
            if (tableOid >= 3000)
            {
              tableName = systemCatalogPtr->tableName(tableOid);

              try
              {
                uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);

                if (nextVal != AUTOINCR_SATURATED)  // need to update syscolumn
                {
                  // get autoincrement column oid
                  int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
                  // get the current nextVal from controller
                  scoped_ptr<DBRM> aDbrm(new DBRM());
                  uint64_t nextValInController = 0;
                  bool validNextVal = false;
                  aDbrm->getAILock(columnOid);
                  nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);  // in case it has changed
                  validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);

                  if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
                  {
                    fWEClient->removeQueue(uniqueId);

                    queRemoved = true;
                    WE_DDLCommandClient ddlClient;
                    uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
                    aDbrm->releaseAILock(columnOid);

                    if (rc != 0)
                    {
                      // for now
                      fSessionManager.committed(txnid);
                      throw std::runtime_error("Error in UpdateSyscolumnNextval");
                    }
                  }
                  else
                    aDbrm->releaseAILock(columnOid);
                }
              }
              catch (std::exception& ex)
              {
                // Rollback transaction, release tablelock
                rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);

                for (unsigned k = 0; k < tableLocks.size(); k++)
                {
                  if (tableLocks[k].tableOID == tableOid)
                  {
                    try
                    {
                      fDbrm->releaseTableLock(tableLocks[k].id);
                    }
                    catch (std::exception&)
                    {
                    }
                  }
                }

                fSessionManager.rolledback(txnid);
                throw std::runtime_error(ex.what());
              }
            }
          }

          int weRc = commitTransaction(uniqueId, txnid);
          logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");

          if (weRc != WriteEngine::NO_ERROR)
          {
            // cout << "Got an error in commitTransaction" << endl;
            WErrorCodes ec;
            ostringstream oss;
            oss << "COMMIT failed: " << ec.errorString(weRc);
            throw std::runtime_error(oss.str());
          }

          fSessionManager.committed(txnid);
          // cout << "commit releasing  transaction id " <<  txnid.id << endl;
        }
        else if ((stmt == "ROLLBACK") && (cpackage.get_isBatchInsert()))
        {
          int weRc = 0;

          // version rollback, Bulkrollback
          weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);

          if (weRc == 0)
          {
            // MCOL-5021
            fDbrm->addToLBIDList(fSessionID, lbidList);

            //@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids.
            fDbrm->invalidateUncommittedExtentLBIDs(0, true, &lbidList);
            cpInvalidated = true;
            weRc =
                rollBackBatchAutoOnTransaction(uniqueId, txnid, fSessionID, cpackage.getTableOid(), errorMsg);
          }
          else
          {
            throw std::runtime_error(errorMsg);
          }

          logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");

          if (weRc != 0)
          {
            //@Bug 4524. Don't set to readonly. Just error out.
            WErrorCodes ec;
            ostringstream oss;
            oss << "ROLLBACK batch insert failed due to: "
                << "(" << weRc << ")" << ec.errorString(weRc);
            // Log to error log
            logging::Message::Args args1;
            logging::Message message1(2);
            args1.add(oss.str());
            message1.format(args1);
            ml.logErrorMessage(message1);
            throw std::runtime_error(oss.str());
          }

          fSessionManager.rolledback(txnid);
          // cout << "batch rollback releasing  transaction id " <<  txnid.id << endl;
        }
        else if (stmt == "ROLLBACK")
        {
          std::string errorMsg("");
          logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
          int weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);

          if (weRc != 0)
          {
            // cout << "Rollback failed" << endl;
            //@Bug 4524. Don't set to readonly. Just error out.
            ostringstream oss;
            oss << "ROLLBACK failed due to: " << errorMsg;
            // Log to error log
            logging::Message::Args args2;
            logging::Message message2(2);
            args2.add(oss.str());
            message2.format(args2);
            ml.logErrorMessage(message2);
            throw std::runtime_error(oss.str());
          }

          fSessionManager.rolledback(txnid);
          // cout << "Rollback releasing  transaction id " <<  txnid.id << endl;
        }

        if (!cpInvalidated)
        {
          // MCOL-5021
          if (stmt == "ROLLBACK")
          {
            fDbrm->addToLBIDList(fSessionID, lbidList);
          }

          fDbrm->invalidateUncommittedExtentLBIDs(0, stmt == "ROLLBACK", &lbidList);
        }
      }
    }
    else if (stmt == "CLEANUP")
    {
      execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID());
      execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID() | 0x80000000);
    }
    else if (stmt == "VIEWTABLELOCK")
    {
      viewTableLock(cpackage, result);
    }
    else if (stmt == "CLEARTABLELOCK")
    {
      clearTableLock(uniqueId, cpackage, result);
    }
    else if (!cpackage.get_Logging())
    {
      BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
      logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_DMLStatement() + ";",
                      cpackage.get_SchemaName());
      SQLLogger sqlLogger(cpackage.get_DMLStatement(), DMLLoggingId, fSessionID, txnid.id);
      // cout << "commandpackageprocessor Logging " << cpackage.get_DMLStatement()+ ";" << endl;
    }
    else
    {
      std::string err = "Unknown command.";
      SUMMARY_INFO(err);
      throw std::runtime_error(err);
    }
  }
  catch (logging::IDBExcept& noTable)  //@Bug 2606 catch no table found exception
  {
    cerr << "CommandPackageProcessor::processPackage: " << noTable.what() << endl;

    result.result = COMMAND_ERROR;
    result.message = Message(noTable.what());
  }
  catch (std::exception& ex)
  {
    if (checkPPLostConnection(ex))
    {
      result.result = PP_LOST_CONNECTION;
    }
    else
    {
      cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl;

      logging::Message::Args args;
      logging::Message message(1);
      args.add(ex.what());
      args.add("");
      args.add("");
      message.format(args);

      result.result = COMMAND_ERROR;
      result.message = message;
    }
  }
  catch (...)
  {
    cerr << "CommandPackageProcessor::processPackage: caught unknown exception!" << endl;
    logging::Message::Args args;
    logging::Message message(1);
    args.add("Command Failed: ");
    args.add("encountered unkown exception");
    args.add("");
    args.add("");
    message.format(args);

    result.result = COMMAND_ERROR;
    result.message = message;
  }

  if (!queRemoved)
    fWEClient->removeQueue(uniqueId);

  // release table lock
  if ((result.result == NO_ERROR) && ((stmt == "COMMIT") || (stmt == "ROLLBACK")))
  {
    TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
    TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
    bool lockReleased = true;

    if (!tablelockMap.empty())
    {
      TablelockData::OIDTablelock::iterator it = tablelockMap.begin();

      while (it != tablelockMap.end())
      {
        try
        {
          lockReleased = fDbrm->releaseTableLock(it->second);
          // cout << "releasing tablelock " << it->second << endl;
        }
        catch (std::exception& ex)
        {
          logging::Message::Args args;
          logging::Message message(1);
          args.add(ex.what());
          args.add("");
          args.add("");
          message.format(args);

          result.result = COMMAND_ERROR;
          result.message = message;
        }

        if (!lockReleased)  // log an error
        {
          ostringstream os;
          os << "tablelock for table oid " << it->first << " is not found";
          logging::Message::Args args;
          logging::Message message(1);
          args.add(os.str());
          args.add("");
          args.add("");
          message.format(args);
          logging::LoggingID lid(21);
          logging::MessageLog ml(lid);

          ml.logErrorMessage(message);
        }

        // cout << "tablelock " << it->second << " is released" << endl;
        it++;
      }

      //@Bug 3557. Clean tablelock cache after commit/rollback.
      TablelockData::removeTablelockData(cpackage.get_SessionID());
    }
  }

  VERBOSE_INFO("Finished processing Command DML Package");
  // LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
  logging::Message::Args args2;
  logging::Message msg1(1);

  if (stmt != "CLEANUP")
  {
    args2.add("End SQL statement");
    msg1.format(args2);
    // Logger logger(logid.fSubsysID);
    logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
  }

  return result;
}

//------------------------------------------------------------------------------
// Process viewtablelock command; return table lock information for the
// specified table.
// This function closely resembles printTableLocks in viewtablelock.cpp.
//------------------------------------------------------------------------------
void CommandPackageProcessor::viewTableLock(const dmlpackage::CalpontDMLPackage& cpackage,
                                            DMLPackageProcessor::DMLResult& result)
{
  // Initialize System Catalog object used to get table name
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  CalpontSystemCatalog::TableName tableName;
  tableName.schema = cpackage.get_SchemaName();
  tableName.table = cpackage.get_TableName();
  execplan::CalpontSystemCatalog::ROPair roPair;

  roPair = systemCatalogPtr->tableRID(tableName);

  // Get list of table locks for the requested table
  std::vector<BRM::TableLockInfo> tableLocks;
  tableLocks = fDbrm->getAllTableLocks();

  // Make preliminary pass through the table locks in order to determine our
  // output column widths based on the data.  Min column widths are based on
  // the width of the column heading (except for the 'state' column).
  uint64_t maxLockID = 0;
  uint32_t maxPID = 0;
  int32_t maxSessionID = 0;
  int32_t minSessionID = 0;
  int32_t maxTxnID = 0;

  unsigned int lockIDColumnWidth = 6;       // "LockID"
  unsigned int ownerColumnWidth = 7;        // "Process"
  unsigned int pidColumnWidth = 3;          // "PID"
  unsigned int sessionIDColumnWidth = 7;    // "Session"
  unsigned int txnIDColumnWidth = 3;        // "Txn"
  unsigned int createTimeColumnWidth = 12;  // "CreationTime"
  unsigned int pmColumnWidth = 7;           // "DBRoots"
  std::vector<std::string> createTimes;
  char cTimeBuffer[1024];

  for (unsigned idx = 0; idx < tableLocks.size(); idx++)
  {
    if (tableLocks[idx].id > maxLockID)
      maxLockID = tableLocks[idx].id;

    if (tableLocks[idx].ownerName.length() > ownerColumnWidth)
      ownerColumnWidth = tableLocks[idx].ownerName.length();

    if (tableLocks[idx].ownerPID > maxPID)
      maxPID = tableLocks[idx].ownerPID;

    if (tableLocks[idx].ownerSessionID > maxSessionID)
      maxSessionID = tableLocks[idx].ownerSessionID;

    if (tableLocks[idx].ownerSessionID < minSessionID)
      minSessionID = tableLocks[idx].ownerSessionID;

    if (tableLocks[idx].ownerTxnID > maxTxnID)
      maxTxnID = tableLocks[idx].ownerTxnID;

    ctime_r(&tableLocks[idx].creationTime, cTimeBuffer);
    cTimeBuffer[strlen(cTimeBuffer) - 1] = '\0';  // strip trailing '\n'
    std::string cTimeStr(cTimeBuffer);

    if (cTimeStr.length() > createTimeColumnWidth)
      createTimeColumnWidth = cTimeStr.length();

    createTimes.push_back(cTimeStr);

    std::ostringstream pms;  // It is dbroots now

    for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
    {
      if (k > 0)
        pms << ',';

      pms << tableLocks[idx].dbrootList[k];
    }

    if (pms.str().length() > pmColumnWidth)
      pmColumnWidth = pms.str().length();
  }

  ownerColumnWidth += 2;
  pmColumnWidth += 2;
  createTimeColumnWidth += 2;

  std::ostringstream idString;
  idString << maxLockID;

  if (idString.str().length() > lockIDColumnWidth)
    lockIDColumnWidth = idString.str().length();

  lockIDColumnWidth += 2;

  std::ostringstream pidString;
  pidString << maxPID;

  if (pidString.str().length() > pidColumnWidth)
    pidColumnWidth = pidString.str().length();

  pidColumnWidth += 2;

  const std::string sessionNoneStr("BulkLoad");
  std::ostringstream sessionString;
  sessionString << maxSessionID;

  if (sessionString.str().length() > sessionIDColumnWidth)
    sessionIDColumnWidth = sessionString.str().length();

  if ((minSessionID < 0) && (sessionNoneStr.length() > sessionIDColumnWidth))
    sessionIDColumnWidth = sessionNoneStr.length();

  sessionIDColumnWidth += 2;

  const std::string txnNoneStr("n/a");
  std::ostringstream txnString;
  txnString << maxTxnID;

  if (txnString.str().length() > txnIDColumnWidth)
    txnIDColumnWidth = txnString.str().length();

  txnIDColumnWidth += 2;

  // Make second pass through the table locks to build our result.
  // Keep in mind the same table could have more than 1 lock
  // (on different PMs), so we don't exit loop after "first" match.
  bool found = false;
  ostringstream os;

  for (unsigned idx = 0; idx < tableLocks.size(); idx++)
  {
    if (roPair.objnum == (CalpontSystemCatalog::OID)tableLocks[idx].tableOID)
    {
      std::ostringstream pms;

      for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
      {
        if (k > 0)
          pms << ',';

        pms << tableLocks[idx].dbrootList[k];
      }

      if (found)  // write newline between lines
      {
        os << endl;
      }
      else  // write the column headers before the first entry
      {
        os.setf(ios::left, ios::adjustfield);
        os << setw(lockIDColumnWidth) << "LockID" << setw(ownerColumnWidth) << "Process"
           << setw(pidColumnWidth) << "PID" << setw(sessionIDColumnWidth) << "Session"
           << setw(txnIDColumnWidth) << "Txn" << setw(createTimeColumnWidth) << "CreationTime" << setw(9)
           << "State" << setw(pmColumnWidth) << "DBRoots" << endl;
      }

      os << "  " << setw(lockIDColumnWidth) << tableLocks[idx].id << setw(ownerColumnWidth)
         << tableLocks[idx].ownerName << setw(pidColumnWidth) << tableLocks[idx].ownerPID;

      // Log session ID, or "BulkLoad" if session is -1
      if (tableLocks[idx].ownerSessionID < 0)
        os << setw(sessionIDColumnWidth) << sessionNoneStr;
      else
        os << setw(sessionIDColumnWidth) << tableLocks[idx].ownerSessionID;

      // Log txn ID, or "n/a" if txn is -1
      if (tableLocks[idx].ownerTxnID < 0)
        os << setw(txnIDColumnWidth) << txnNoneStr;
      else
        os << setw(txnIDColumnWidth) << tableLocks[idx].ownerTxnID;

      os << setw(createTimeColumnWidth) << createTimes[idx] << setw(9)
         << ((tableLocks[idx].state == BRM::LOADING) ? "LOADING" : "CLEANUP") << setw(pmColumnWidth)
         << pms.str();

      found = true;
    }  // end of displaying a table lock match
  }    // end of loop through all table locks

  if (!found)
  {
    os << " Table " << tableName.schema << "." << tableName.table << " is not locked by any process.";
  }

  result.tableLockInfo = os.str();
}

//------------------------------------------------------------------------------
// Process cleartablelock command; execute bulk rollback and release table lock
// for the specified table.
//------------------------------------------------------------------------------
void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage::CalpontDMLPackage& cpackage,
                                             DMLPackageProcessor::DMLResult& result)
{
  CalpontSystemCatalog::TableName tableName;
  tableName.schema = cpackage.get_SchemaName();
  tableName.table = cpackage.get_TableName();

  // Get the Table lock ID that is passed in the SQL statement attribute.
  // This is a kludge we may want to consider changing.  Could add a table
  // lock ID attribute to the CalpontDMLPackage object.
  std::istringstream lockIDString(cpackage.get_SQLStatement());
  uint64_t tableLockID;
  lockIDString >> tableLockID;

  //----------------------------------------------------- start of syslog code
  //
  // Log initiation of cleartablelock to syslog
  //
  const std::string APPLNAME("cleartablelock SQL cmd");
  const int SUBSYSTEM_ID = 21;  // dmlpackageproc
  const int INIT_MSG_NUM = logging::M0088;
  logging::Message::Args msgArgs;
  logging::Message logMsg1(INIT_MSG_NUM);
  msgArgs.add(APPLNAME);
  msgArgs.add(tableName.toString());
  msgArgs.add(tableLockID);
  logMsg1.format(msgArgs);
  logging::LoggingID lid(SUBSYSTEM_ID);
  logging::MessageLog ml(lid);
  ml.logInfoMessage(logMsg1);
  //------------------------------------------------------- end of syslog code

  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  messageqcpp::ByteStream bsOut;
  bool lockGrabbed = false;
  bool bErrFlag = false;
  bool bRemoveMetaErrFlag = false;
  std::ostringstream combinedErrMsg;

  try
  {
    // Make sure BRM is in READ-WRITE state before starting
    int brmRc = fDbrm->isReadWrite();

    if (brmRc != BRM::ERR_OK)
    {
      std::string brmErrMsg;
      BRM::errString(brmRc, brmErrMsg);
      std::ostringstream oss;
      oss << "Failed BRM status check: " << brmErrMsg;
      throw std::runtime_error(oss.str());
    }

    BRM::TableLockInfo lockInfo;
    establishTableLockToClear(tableLockID, lockInfo);
    lockGrabbed = true;

    oam::OamCache* oamCache = oam::OamCache::makeOamCache();
    oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap();
    std::map<int, int>::const_iterator mapIter;
    std::set<int> pmSet;

    // Construct relevant list of PMs based on the DBRoots associated
    // with the tableLock.
    for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
    {
      mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]);

      if (mapIter != dbRootPmMap->end())
      {
        int pm = mapIter->second;
        pmSet.insert(pm);
      }
      else
      {
        std::ostringstream oss;
        oss << "DBRoot " << lockInfo.dbrootList[k] << " does not map to a PM.  Cannot perform rollback";
        throw std::runtime_error(oss.str());
      }
    }

    std::vector<int> pmList;

    for (std::set<int>::const_iterator setIter = pmSet.begin(); setIter != pmSet.end(); ++setIter)
    {
      pmList.push_back(*setIter);
    }

    std::cout << "cleartablelock rollback for table lock " << tableLockID << " being forwarded to PM(s): ";

    for (unsigned int k = 0; k < pmList.size(); k++)
    {
      if (k > 0)
        std::cout << ", ";

      std::cout << pmList[k];
    }

    std::cout << std::endl;

    // Perform bulk rollback if state is in LOADING state
    if (lockInfo.state == BRM::LOADING)
    {
      fWEClient->addQueue(uniqueId);

      //------------------------------------------------------------------
      // Send rollback msg to the writeengine server for every PM.
      // We send to each PM, instead of just to PMs in the tablelock's
      // PM list, just in case a DBRoot has been moved to a different PM.
      //------------------------------------------------------------------
      //			std::cout << "cleartablelock rollback: tableLock-" << tableLockID <<
      //				": uniqueID-" << uniqueId             <<
      //				": oid-"      << lockInfo.tableOID    <<
      //				"; name-"     << tableName.toString() <<
      //				"; app-"      << APPLNAME             << std::endl;

      bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
      bsOut << uniqueId;
      bsOut << tableLockID;
      bsOut << lockInfo.tableOID;
      bsOut << tableName.toString();
      bsOut << APPLNAME;

      for (unsigned j = 0; j < pmList.size(); j++)
      {
        fWEClient->write(bsOut, pmList[j]);
      }

      // Wait for "all" the responses, and accumulate any/all errors
      unsigned int pmMsgCnt = 0;

      while (pmMsgCnt < pmList.size())
      {
        std::string rollbackErrMsg;
        bsIn.reset(new messageqcpp::ByteStream());
        fWEClient->read(uniqueId, bsIn);

        if (bsIn->length() == 0)
        {
          bRemoveMetaErrFlag = true;

          if (combinedErrMsg.str().length() > 0)
            combinedErrMsg << std::endl;

          combinedErrMsg << "Network error, PM rollback; ";
        }
        else
        {
          messageqcpp::ByteStream::byte rc;
          uint16_t pmNum;
          *bsIn >> rc;
          *bsIn >> rollbackErrMsg;
          *bsIn >> pmNum;

          //					std::cout << "cleartablelock rollback response from PM"<<
          //						pmNum << "; rc-" << (int)rc <<
          //						"; errMsg: {" << rollbackErrMsg << '}' << std::endl;

          if (rc != 0)
          {
            bRemoveMetaErrFlag = true;

            if (combinedErrMsg.str().empty())
              combinedErrMsg << "Rollback error; ";
            else
              combinedErrMsg << std::endl;

            combinedErrMsg << "[PM" << pmNum << "] " << rollbackErrMsg;
          }
        }

        pmMsgCnt++;
      }  // end of while loop to process all responses to bulk rollback

      // If no errors so far, then change state to CLEANUP state.
      // We ignore the return stateChange flag.
      if (!bErrFlag)
      {
        fDbrm->changeState(tableLockID, BRM::CLEANUP);
      }
    }  // end of (lockInfo.state == BRM::LOADING)

    // If no errors so far, then:
    // 1. delete meta data backup rollback files
    // 2. finally release table lock
    if (!bErrFlag)
    {
      bsOut.reset();

      //------------------------------------------------------------------
      // Delete meta data backup rollback files
      //------------------------------------------------------------------
      //			std::cout << "cleartablelock cleanup: uniqueID-" << uniqueId <<
      //				": oid-" << lockInfo.tableOID << std::endl;

      bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP;
      bsOut << uniqueId;
      bsOut << lockInfo.tableOID;

      for (unsigned j = 0; j < pmList.size(); j++)
      {
        fWEClient->write(bsOut, pmList[j]);
      }

      // Wait for "all" the responses, and accumulate any/all errors
      unsigned int pmMsgCnt = 0;

      while (pmMsgCnt < pmList.size())
      {
        std::string fileDeleteErrMsg;
        bsIn.reset(new messageqcpp::ByteStream());
        fWEClient->read(uniqueId, bsIn);

        if (bsIn->length() == 0)
        {
          bRemoveMetaErrFlag = true;

          if (combinedErrMsg.str().length() > 0)
            combinedErrMsg << std::endl;

          combinedErrMsg << "Network error, PM rollback cleanup; ";
        }
        else
        {
          messageqcpp::ByteStream::byte rc;
          uint16_t pmNum;
          *bsIn >> rc;
          *bsIn >> fileDeleteErrMsg;
          *bsIn >> pmNum;

          //					std::cout << "cleartablelock cleanup response from PM" <<
          //						pmNum << "; rc-" << (int)rc <<
          //						"; errMsg: {" << fileDeleteErrMsg << '}' << std::endl;

          if (rc != 0)
          {
            bRemoveMetaErrFlag = true;

            if (combinedErrMsg.str().empty())
              combinedErrMsg << "Cleanup error; ";
            else
              combinedErrMsg << std::endl;

            combinedErrMsg << "[PM" << pmNum << "] " << fileDeleteErrMsg;
          }
        }

        pmMsgCnt++;
      }  // end of while loop to process all responses to rollback cleanup

      // We ignore return release flag from releaseTableLock().
      if (!bErrFlag)
      {
        fDbrm->releaseTableLock(tableLockID);
      }
    }
  }
  catch (std::exception& ex)
  {
    bErrFlag = true;
    combinedErrMsg << ex.what();
  }

  if (!bErrFlag)
  {
    std::ostringstream oss;
    oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " is cleared.";

    //@Bug 4517. Release tablelock if remove meta files failed.
    if (bRemoveMetaErrFlag)
    {
      oss << " Warning: " << combinedErrMsg.str();
    }

    result.tableLockInfo = oss.str();
  }
  else
  {
    std::ostringstream oss;
    oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " cannot be cleared.  "
        << combinedErrMsg.str();
    result.tableLockInfo = oss.str();
  }

  // Remove tableLockID out of the active cleartableLock command list
  if (lockGrabbed)
  {
    boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex);
    fActiveClearTableLockCmds.erase(tableLockID);
  }

  //----------------------------------------------------- start of syslog code
  //
  // Log completion of cleartablelock to syslog
  //
  const int COMP_MSG_NUM = logging::M0089;
  msgArgs.reset();
  logging::Message logMsg2(COMP_MSG_NUM);
  msgArgs.add(APPLNAME);
  msgArgs.add(tableName.toString());
  msgArgs.add(tableLockID);
  std::string finalStatus;

  if (!bErrFlag)
  {
    finalStatus = "Completed successfully";
  }
  else
  {
    finalStatus = "Encountered errors: ";
    finalStatus += combinedErrMsg.str();
  }

  msgArgs.add(finalStatus);
  logMsg2.format(msgArgs);
  ml.logInfoMessage(logMsg2);
  //------------------------------------------------------- end of syslog code
}

//------------------------------------------------------------------------------
// Get/return the current tablelock info for tableLockID, and reassign the table
// lock to ourselves if we can.  If the lock is still in use by another process
// or by another DML cleartablelock thread, then we error out with exception.
//------------------------------------------------------------------------------
void CommandPackageProcessor::establishTableLockToClear(uint64_t tableLockID, BRM::TableLockInfo& lockInfo)
{
  boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex);

  // Get current table lock info
  bool getLockInfo = fDbrm->getTableLockInfo(tableLockID, &lockInfo);

  if (!getLockInfo)
  {
    throw std::runtime_error(std::string("Lock does not exist."));
  }

  std::string processName("DMLProc clearTableLock");
  uint32_t processID = ::getpid();

  // See if another thread is executing a cleartablelock cmd for this table
  if ((lockInfo.ownerName == processName) && (lockInfo.ownerPID == processID))
  {
    std::set<uint64_t>::const_iterator it = fActiveClearTableLockCmds.find(tableLockID);

    if (it != fActiveClearTableLockCmds.end())
    {
      throw std::runtime_error(
          std::string("Lock in use.  "
                      "DML is executing another cleartablelock MySQL cmd."));
    }
  }
  else
  {
    // Take over ownership of stale lock.
    // Use "DMLProc clearTableLock" as process name to differentiate
    // from a DMLProc lock used for inserts, updates, etc.
    int32_t sessionID = fSessionID;
    int32_t txnid = -1;
    bool ownerChanged = fDbrm->changeOwner(tableLockID, processName, processID, sessionID, txnid);

    if (!ownerChanged)
    {
      throw std::runtime_error(std::string("Unable to grab lock; lock not found or still in use."));
    }
  }

  // Add this cleartablelock command to the list of active cleartablelock cmds
  fActiveClearTableLockCmds.insert(tableLockID);
}

}  // namespace dmlpackageprocessor
